package com.jie.flink.cdc.flinksink;

import com.jie.flink.cdc.flinksink.config.KafkaConfigProperties;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;

/**
 * @author zhanggj
 * @date 2023/5/29 10:18
 * @desc
 */
public class KafkaFlinkSinkBuilder implements FlinkSinkBuilder<KafkaConfigProperties> {
    private KafkaFlinkSinkBuilder() {
    }

    private static class InstanceHolder{
        private static KafkaFlinkSinkBuilder INSTANCE = new KafkaFlinkSinkBuilder();
    }
    public static KafkaFlinkSinkBuilder getInstance() {
        return InstanceHolder.INSTANCE;
    }

    @Override
    public Sink<String> buildSink(final KafkaConfigProperties sinkConfigProperties) {
        return  KafkaSink.<String>builder()
                .setBootstrapServers(sinkConfigProperties.getBootstrapServer())
                .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                        .setTopic(sinkConfigProperties.getTopic())
                        .setValueSerializationSchema(new SimpleStringSchema())
                        .setKafkaKeySerializer(KafkaKeySerializer.class)
                        .build())
                .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
                .build();
    }
}
